package com.fly.demo.tcp.base;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fly.core.utils.JsonBeanUtils;
import com.fly.demo.tcp.entity.Command;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TcpClient implements Runnable
{
    private String ip;
    
    private int port;
    
    private Socket socket;
    
    private DataOutputStream dataOutputStream;
    
    private String clientName;
    
    private boolean isClientCoreRun = false;
    
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
    private ExecutorService executor = Executors.newFixedThreadPool(2);
    
    public TcpClient(String clientName)
    {
        super();
        this.clientName = clientName;
    }
    
    public String getClientName()
    {
        return clientName;
    }
    
    /**
     * 
     * @param ip 服务端IP
     * @param port 服务端PORT
     * @return
     */
    public boolean connectServer(String ip, int port)
    {
        try
        {
            this.ip = ip;
            this.port = port;
            socket = new Socket(InetAddress.getByName(ip), port);
            log.info("****** TcpClient will connect to Server {}:{}", ip, port);
            scheduler.scheduleAtFixedRate(this::checkConnection, 0, 10, TimeUnit.SECONDS);
            isClientCoreRun = true;
            dataOutputStream = new DataOutputStream(socket.getOutputStream());
            dataOutputStream.writeUTF(clientName);
            dataOutputStream.flush();
        }
        catch (IOException e)
        {
            log.error(e.getMessage());
            isClientCoreRun = false;
        }
        return isClientCoreRun;
    }
    
    /**
     * 检查TCP连接
     */
    private void checkConnection()
    {
        if (socket == null || socket.isClosed())
        {
            log.error("Connection lost, attempting to reconnect");
            reconnect();
        }
    }
    
    private void reconnect()
    {
        try
        {
            socket = new Socket(InetAddress.getByName(ip), port);
            log.info("****** TcpClient will connect to Server {}:{}", ip, port);
            isClientCoreRun = true;
            executor.execute(new ReceiveMsg());
            dataOutputStream = new DataOutputStream(socket.getOutputStream());
            dataOutputStream.writeUTF(clientName);
            dataOutputStream.flush();
        }
        catch (IOException e)
        {
            log.error(e.getMessage());
            isClientCoreRun = false;
        }
    }
    
    /**
     * 发送报文
     */
    public void sendMsg(String msg)
    {
        try
        {
            dataOutputStream.writeUTF(msg);
            dataOutputStream.flush();
        }
        catch (IOException e)
        {
            log.error(e.getMessage());
            closeClientConnect();
        }
    }
    
    /**
     * 断开客户端与服务端的连接
     */
    public void closeClientConnect()
    {
        if (dataOutputStream != null)
        {
            try
            {
                dataOutputStream.close();
                isClientCoreRun = false;
                if (socket != null)
                {
                    socket.close();
                }
            }
            catch (IOException e)
            {
                log.error(e.getMessage());
            }
        }
    }
    
    @Override
    public void run()
    {
        executor.execute(new ReceiveMsg());
    }
    
    class ReceiveMsg implements Runnable
    {
        private DataInputStream dataInputStream;
        
        public ReceiveMsg()
        {
            try
            {
                // 数据输入流
                dataInputStream = new DataInputStream(socket.getInputStream());
            }
            catch (IOException e)
            {
                log.error(e.getMessage());
            }
        }
        
        @Override
        public void run()
        {
            try
            {
                // server停止后, 会影响接受消息线程工作
                while (isClientCoreRun)
                {
                    String msg = dataInputStream.readUTF();
                    log.info("{} get msg: {}", clientName, msg);
                    if (msg.contains("commandId"))
                    {
                        Command cmd = JsonBeanUtils.jsonToBean(msg, Command.class);
                        sendMsg("hello Command！" + JsonBeanUtils.beanToJson(cmd));
                    }
                }
            }
            catch (IOException e)
            {
                log.error(e.getMessage());
                // 防止重连失败
                closeClientConnect();
            }
        }
    }
}
